TaskQueue.js ➔ ... ➔ this.isPaused   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
var timeout = require('./timeout').timeout
2
var Future = require('./Future').Future
3
var Slf4j = require('../logger/Slf4j').Slf4j
4
5
/**
6
 * Designed to be thrown whenever client code tries to push task in
7
 * closed queue
8
 *
9
 * @param {string} message
10
 * @class
11
 */
12
function RejectionException (message) {
13
  this.message = message || 'Task has been rejected'
14
  this.stack = (new Error()).stack
15
}
16
17
RejectionException.prototype = Object.create(Error.prototype)
18
RejectionException.prototype.constructor = RejectionException
19
RejectionException.prototype.name = 'RejectionException'
20
21
/**
22
 * @typedef {Object} TaskQueue~Options
23
 *
24
 * @property {LoggerOptions} logger
25
 * @property {string} [name] Queue name
26
 */
27
28
/**
29
 * @typedef {Object} TaskQueue~TaskOptions
30
 *
31
 * @property {int} [timeout]
32
 * @property {string} [name]
33
 */
34
35
/**
36
 * @typedef {Object} TaskQueue~Task
37
 *
38
 * @property {int} id
39
 * @property {Function} factory
40
 * @property {int} timeout
41
 * @property {Promise.<*>|Thenable.<*>} completion
42
 * @property {string} name
43
 */
44
45
/**
46
 * @typedef {Object} TaskQueue~Statistics
47
 *
48
 * @property {int} enqueued
49
 * @property {int} completed
50
 * @property {int} successful
51
 * @property {int} discarded
52
 * @property {int} rejected
53
 */
54
55
/**
56
 * Task queue that allows sequential task processing.
57
 *
58
 * @class
59
 * @param {TaskQueue~Options} [options]
60
 */
61
function TaskQueue (options) {
62
  options = options || {}
63
64
  var queue = []
65
66
  var enqueued = 0
67
  var completed = 0
68
  var successful = 0
69
  var rejected = 0
70
  var discarded = 0
71
72
  var paused = true
73
  var closed = false
74
  var termination = new Future()
75
  /**
76
   * @type {TaskQueue~Task|null}
77
   */
78
  var current = null
79
80
  var logger = Slf4j.factory(options.logger, 'ama-team.voxengine-sdk.concurrent.task-queue')
81
82
  function setName (name) {
83
    logger.attach('name', name)
84
  }
85
86
  /**
87
   * Sets queue name which will turn up in logs.
88
   *
89
   * @function TaskQueue#setName
90
   *
91
   * @param {string} name
92
   */
93
  this.setName = setName
94
95
  if (options.name) {
96
    setName(options.name)
97
  }
98
99
  /**
100
   * Executed provided task
101
   *
102
   * @param {TaskQueue~Task} task
103
   */
104
  function execute (task) {
105
    try {
106
      return timeout(Promise.resolve(task.factory()), task.timeout)
107
    } catch (e) {
108
      return Promise.reject(e)
109
    }
110
  }
111
112
  /**
113
   * Handler to be run after task has been fulfilled.
114
   *
115
   * @param {*} value
116
   */
117
  function taskFulfillmentHandler (value) {
118
    logger.debug('Task "{}", #{} has completed successfully', current.name,
119
      current.id)
120
    completed++
121
    successful++
122
    current.completion.resolve(value)
123
  }
124
125
  /**
126
   * Handler to be run after task has been rejected
127
   *
128
   * @param {Error|*} error
129
   */
130
  function taskRejectionHandler (error) {
131
    completed++
132
    logger.debug('Task "{}", #{} has rejected with {}', current.name,
133
      current.id, (error && error.name ? error.name : error))
134
    current.completion.reject(error)
135
  }
136
137
  /**
138
   * Cleanup handler to be run after task has been handled
139
   */
140
  function postCompletionHook () {
141
    current = null
142
    if (closed && queue.length === 0) {
143
      termination.resolve()
144
    } else {
145
      proceed()
146
    }
147
  }
148
149
  /**
150
   * Pick up next task for processing, if necessary
151
   */
152
  function proceed () {
153
    if (paused || current || queue.length === 0) {
154
      return
155
    }
156
    current = queue.shift()
157
    logger.debug('Executing task "{}", #{}', current.name, current.id)
158
    execute(current)
159
      .then(taskFulfillmentHandler, taskRejectionHandler)
160
      .then(postCompletionHook)
161
  }
162
163
  /**
164
   * Adds new task to queue.
165
   *
166
   * @param {Function} factory Function representing task execution. It
167
   *   should return Promise if it relies on I/O.
168
   * @param {TaskQueue~TaskOptions} [options]
169
   * @return {Future.<*>}
170
   */
171
  this.push = function (factory, options) {
172
    if (closed) {
173
      rejected++
174
      var error = new RejectionException('Can\'t enqueue task: queue is closed')
175
      return Promise.reject(error)
176
    }
177
    options = options || {}
178
    enqueued++
179
    var task = {
180
      id: enqueued,
181
      name: options.name || 'Task #' + enqueued,
182
      factory: factory,
183
      timeout: options.timeout,
184
      completion: new Future()
185
    }
186
    logger.debug('Registering task "{}", #{}', task.name, task.id)
187
    queue.push(task)
188
    proceed()
189
    return task.completion
190
  }
191
192
  /**
193
   * Start processing
194
   *
195
   * @return {TaskQueue}
196
   */
197
  this.start = function () {
198
    logger.debug('Starting queue processing')
199
    paused = false
200
    proceed()
201
    return this
202
  }
203
204
  /**
205
   * Pause processing until #start() is called.
206
   *
207
   * @return {Promise.<*>|Thenable.<*>}
208
   */
209
  this.pause = function () {
210
    logger.debug('Pausing queue processing')
211
    paused = true
212
    return current ? current.completion : Promise.resolve()
213
  }
214
215
  /**
216
   * @return {boolean}
217
   */
218
  this.isPaused = function () {
219
    return paused
220
  }
221
222
  /**
223
   * @return {boolean}
224
   */
225
  this.isClosed = function () {
226
    return closed
227
  }
228
229
  /**
230
   * @return {Promise.<TaskQueue~Statistics>}
231
   */
232
  function close () {
233
    logger.debug('Shutting down queue')
234
    closed = true
235
    var last = queue.length > 0 ? queue[queue.length - 1] : current
236
    if (last) {
237
      return last.completion.then(getStatistics, getStatistics)
238
    }
239
    return Promise.resolve(getStatistics())
240
  }
241
242
  /**
243
   * Abruptly terminates queue, discarding all tasks in queue
244
   *
245
   * @return {Promise.<TaskQueue~Statistics>}
246
   */
247
  this.terminate = function () {
248
    logger.debug('Terminating queue, discarding awaiting {} tasks',
249
      queue.length)
250
    discarded += queue.length
251
    while (queue.length > 0) {
252
      var task = queue.shift()
253
      logger.trace('Discarding task "{}", #{}', task.name, task.id)
254
    }
255
    return close()
256
  }
257
258
  /**
259
   * Closes queue for processing, waiting for all remaining tasks to
260
   * complete and then resolving returned promise.
261
   *
262
   * @function TaskQueue#close
263
   *
264
   * @return {Promise.<TaskQueue~Statistics>}
265
   */
266
  this.close = close
267
268
  function getStatistics () {
269
    return {
270
      enqueued: enqueued,
271
      completed: completed,
272
      successful: successful,
273
      rejected: rejected,
274
      discarded: discarded
275
    }
276
  }
277
278
  /**
279
   * @function TaskQueue#getStatistics
280
   *
281
   * @return {TaskQueue~Statistics}
282
   */
283
  this.getStatistics = getStatistics
284
285
  /**
286
   * @return {Number}
287
   */
288
  this.getLength = function () {
289
    return queue.length
290
  }
291
}
292
293
/**
294
 * Alternative constructor that starts queue instantly
295
 *
296
 * @param {TaskQueue~Options} options
297
 * @return {TaskQueue}
298
 */
299
TaskQueue.started = function (options) {
300
  return new TaskQueue(options).start()
301
}
302
303
module.exports = {
304
  TaskQueue: TaskQueue,
305
  RejectionException: RejectionException
306
}
307